/*
*  Copyright (c) 2001 Sun Microsystems, Inc.  All rights
*  reserved.
*
*  Redistribution and use in source and binary forms, with or without
*  modification, are permitted provided that the following conditions
*  are met:
*
*  1. Redistributions of source code must retain the above copyright
*  notice, this list of conditions and the following disclaimer.
*
*  2. Redistributions in binary form must reproduce the above copyright
*  notice, this list of conditions and the following disclaimer in
*  the documentation and/or other materials provided with the
*  distribution.
*
*  3. The end-user documentation included with the redistribution,
*  if any, must include the following acknowledgment:
*  "This product includes software developed by the
*  Sun Microsystems, Inc. for Project JXTA."
*  Alternately, this acknowledgment may appear in the software itself,
*  if and wherever such third-party acknowledgments normally appear.
*
*  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
*  must not be used to endorse or promote products derived from this
*  software without prior written permission. For written
*  permission, please contact Project JXTA at http://www.jxta.org.
*
*  5. Products derived from this software may not be called "JXTA",
*  nor may "JXTA" appear in their name, without prior written
*  permission of Sun.
*
*  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
*  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
*  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
*  DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
*  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
*  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
*  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
*  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
*  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
*  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
*  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
*  SUCH DAMAGE.
*  ====================================================================
*
*  This software consists of voluntary contributions made by many
*  individuals on behalf of Project JXTA.  For more
*  information on Project JXTA, please see
*  <http://www.jxta.org/>.
*
**/
package net.jxta.myjxta.plugins.filetransfer;

import net.jxta.id.IDFactory;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.PipeID;
import net.jxta.platform.NetworkManager;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.socket.JxtaSocket;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;
import java.io.*;
import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Sends a given file towards a given receiver inside a specific jxta groups
 * The transfer is done via JxtaSocket connections
 *
 * @version $Id: FileSender.java,v 1.5 2007/05/27 16:02:02 nano Exp $
 */
public final class FileSender extends Thread {
    private int m_timeout = 5000;

    public enum TRANSFER_STATUS_ENUM {
        CONNECTING, WAITING_FOR_RECEIVER, TRANSFERING, FINISHED
    }

    public static final String TRANSFER_PROGRESS_PROPERTY = "transferProgress";
    public static final String TRANSFER_STATUS_PROPERTY = "transferStatus";
    public static final String TRANSFER_THROUGHPUT_KBYTE_PROPERTY = "transferThroughputKByte";

    /**
     * payload size
     */
    private final static int PAYLOADSIZE = 64 * 1024;


    private final String m_fileName;
    private final long m_bytesTransfered = 0;
    private final File m_file;
    private final long m_estimatedFileSize;

    private int m_progress = 0;


    private final PeerGroup m_peergroup;
    private final PipeAdvertisement m_pipeAdv;
    private final InputStream m_inputStream;
    PropertyChangeSupport m_changeSupport = new PropertyChangeSupport(this);

    public FileSender(PeerGroup peerGroup, PipeID targetPipeID, File dataSource) throws FileNotFoundException {
        m_peergroup = peerGroup;
        m_pipeAdv = FileReceiver.createSocketAdvertisement(targetPipeID);
        m_file = dataSource;
        m_estimatedFileSize = m_file.length();
        m_fileName = m_file.getName();
        if (m_file.exists() && m_file.canRead()) {
            m_inputStream = new FileInputStream(dataSource);
        } else {
            m_inputStream = null;
        }

    }

    public void addPropertyChangeListener(PropertyChangeListener listener) {
        m_changeSupport.addPropertyChangeListener(listener);
    }

    public long getTransferedBytes() {
        return m_bytesTransfered;
    }

    public void setTimeout(int m_timeout) {
        this.m_timeout = m_timeout;
    }


    public void run() {
        try {

            long start = System.currentTimeMillis();
            System.out.println("Connecting to the server");
            m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.CONNECTING);
            JxtaSocket socket = new JxtaSocket(m_peergroup,
                    //no specific peerid
                    null,
                    m_pipeAdv,
                    //connection timeout: 5 seconds
                    m_timeout,
                    // reliable connection
                    true);

            // get the socket output stream
            OutputStream out = socket.getOutputStream();
            DataOutput dos = new DataOutputStream(out);

            // get the socket input stream
            InputStream in = socket.getInputStream();
            DataInput dis = new DataInputStream(in);

//            long bytesSend = ITERATIONS * (long) PAYLOADSIZE * 2;
//            System.out.println("Sending/Receiving " + bytesSend + " bytes.");
            m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.WAITING_FOR_RECEIVER);
            dos.writeUTF(m_fileName);
            dos.writeLong(m_estimatedFileSize);
            out.flush();
            long requestedOffset = -1;
            try {
                requestedOffset = dis.readLong();
            } catch (IOException e) {
                //ignore
            }

            long bytesSend = 0;
            if (requestedOffset != -1) {
                byte[] out_buf = new byte[PAYLOADSIZE];
                byte[] in_buf = new byte[16];
                MessageDigest md5 = MessageDigest.getInstance("MD5");
                md5.reset();

                long offset = requestedOffset;
                int bytesRead = readFile(0, offset, out_buf);
                md5.update(out_buf, 0, bytesRead);
                while (bytesRead != -1) {
                    System.out.println("sending offset: " + offset);
                    dos.writeLong(offset);
                    System.out.println("sending size:" + bytesRead);
                    dos.writeInt(bytesRead);
                    out.flush();
                    log("sending data");
                    out.write(out_buf, 0, bytesRead);
                    bytesSend += bytesRead;
                    out.flush();
                    log("waiting for md5");
                    dis.readFully(in_buf);
                    log("waiting for next offset request");
                    requestedOffset = dis.readLong();
                    bytesSend += in_buf.length;
                    if (Arrays.equals(in_buf, md5.digest())) {
                        log("md5 checksum correct:" + offset);
                        md5.reset();
                        offset = offset + bytesRead;
                        updateProgress(offset);
                        if (requestedOffset != -1) {
                            bytesRead = readFile(offset, requestedOffset, out_buf);

                            if (bytesRead != -1) {
                                offset = requestedOffset;
                                md5.update(out_buf, 0, bytesRead);
                            }
                        }
                    } else {
                        log("md5 checksum wrong, resending offset " + offset);
                    }
                }
                dos.writeLong(offset); //should be the overall filesize
                dos.writeInt(0); //end of transfer
            } else {
                //other side rejected the transfer
            }


            out.close();
            in.close();

            long finish = System.currentTimeMillis();
            long elapsed = finish - start;

            int throughputKBytes = (int) (bytesSend / elapsed) * 1000 / 1024;
            m_changeSupport.firePropertyChange(FileSender.TRANSFER_THROUGHPUT_KBYTE_PROPERTY, 0, throughputKBytes);
            socket.close();
            log("Socket connection closed");
            m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.FINISHED);
            finish();
        } catch (IOException io) {
            io.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }

    private static final Logger LOG = Logger.getLogger(FileSender.class.getName());

    private static void log(String s) {
        LOG.fine(s);
    }

    private int readFile(long offset, long requestedOffset, byte[] out_buf) throws IOException {
        if (requestedOffset > offset) {
            long skipped = m_inputStream.skip(requestedOffset - offset);
            assert (skipped == requestedOffset - offset);
        } else if (requestedOffset < offset)
            return -1;   //not possible with input streams

        return m_inputStream.read(out_buf, 0, PAYLOADSIZE);
    }

    private void finish() {
        m_changeSupport = null;
    }

    private void updateProgress(long offset) {
        int newProgress = Math.round(100 * ((float) offset / (float) m_estimatedFileSize));
        if (newProgress != m_progress) {
            int oldProgress = m_progress;
            m_progress = newProgress;
            m_changeSupport.firePropertyChange(TRANSFER_PROGRESS_PROPERTY, oldProgress, m_progress);
        }
    }

    /**
     * @param args first parameter is the filename which should be send to the other side.
     */
    public static void main(String args[]) {
        System.setProperty("net.jxta.logging.Logging", Level.INFO.getName());
        NetworkManager manager = null;
        try {
            manager = new NetworkManager(NetworkManager.ConfigMode.EDGE,
                    "FileSender", new File(new File(".cache"), "FileSender").toURI());
            manager.setUseDefaultSeeds(true);
            manager.startNetwork();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }

        try {
            Thread.currentThread().setName(FileSender.class.getName() + ".main()");
            log("Waiting for Rendezvous...");
            if (!manager.waitForRendezvousConnection(30 * 1000)) {
                log("unable to connect to Rendezvous");
            } else {
                log("Rendezvous connection established");
            }
            String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503393B5C2F6CA7A41FBB0F890173088E79404";
            PipeID socketID = (PipeID) IDFactory.fromURI(new URI(SOCKETIDSTR));
            File sourceFile = new File(args[0]);

            FileSender socEx = new FileSender(manager.getNetPeerGroup(), socketID, sourceFile);

            //the bean communicates with us via a property change listener
            socEx.addPropertyChangeListener(new PropertyChangeListener() {
                public void propertyChange(PropertyChangeEvent propertyChangeEvent) {
                    System.out.println(propertyChangeEvent.getPropertyName() + ":" + propertyChangeEvent.getNewValue());
                }
            });
            socEx.setTimeout(30 * 1000);

            //start the file-transfer (blocking call)
            socEx.run();

            manager.stopNetwork();
        } catch (Throwable e) {
            System.out.flush();
            System.err.println("Failed : " + e);
            e.printStackTrace(System.err);
            System.exit(-1);
        }
    }
}

